{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "ff054b1d-f6f7-4c8f-9b50-56d3b5ed1ac9",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "f4658324-c133-4921-b53f-dd6141558f98",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "25/01/17 20:40:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n"
     ]
    }
   ],
   "source": [
    "# Create a SparkSession\n",
    "spark = SparkSession.builder \\\n",
    "    .appName(\"RDD-demo\") \\\n",
    "    .master(\"spark://spark-master:7077\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "97a5a364-a829-4b89-8cb0-6872d0bdafb3",
   "metadata": {},
   "source": [
    "### How to create RDDs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "0e31a7ea-d6fd-49f4-88cd-40c2bda5838a",
   "metadata": {},
   "outputs": [],
   "source": [
    "numbers = [1, 2, 3, 4, 5]\n",
    "rdd = spark.sparkContext.parallelize(numbers)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "09f84b37-e1a3-4d90-929e-a60c9063d669",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[1, 2, 3, 4, 5]"
      ]
     },
     "execution_count": 25,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Collect action: Retrieve all elements of the RDD\n",
    "rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "fd2ee436-1186-488f-8294-b46ce9c67cac",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create an RDD from a list of tuples\n",
    "data = [(\"Alice\", 25), (\"Bob\", 30), (\"Charlie\", 35), (\"Alice\", 40)]\n",
    "rdd = spark.sparkContext.parallelize(data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "adad76e0-2c10-4a41-b947-89547fe94d35",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 1:=======================================>                   (4 + 2) / 6]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "All elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "# Collect action: Retrieve all elements of the RDD\n",
    "print(\"All elements of the rdd: \", rdd.collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "44f98372-bc7a-43d7-b1c1-6d7d102bee29",
   "metadata": {},
   "source": [
    "### RDDs Operation: Actions "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "60353b82-fd00-4e94-b11c-d31e8e005122",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The total number of elements in rdd:  4\n"
     ]
    }
   ],
   "source": [
    "# Count action: Count the number of elements in the RDD\n",
    "count = rdd.count()\n",
    "print(\"The total number of elements in rdd: \", count)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "456dbe75-0182-47ba-aaf9-34ad9ab06f55",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The first element of the rdd:  ('Alice', 25)\n"
     ]
    }
   ],
   "source": [
    "# First action: Retrieve the first element of the RDD\n",
    "first_element = rdd.first()\n",
    "print(\"The first element of the rdd: \", first_element)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "979ae249-efb6-4b8b-b8e5-0cddea496ff9",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The first two elements of the rdd:  [('Alice', 25), ('Bob', 30)]\n"
     ]
    }
   ],
   "source": [
    "# Take action: Retrieve the n elements of the RDD\n",
    "taken_elements = rdd.take(2)\n",
    "print(\"The first two elements of the rdd: \", taken_elements)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "38bd8336-338e-4876-96e0-5e9aa19b5b36",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Foreach action: Print each element of the RDD\n",
    "rdd.foreach(lambda x: print(x))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c30f740c-f5fd-48d1-9e8b-a0f78caab408",
   "metadata": {},
   "source": [
    "### RDDs Operation: Transformations "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "b3f7d23b-f246-4797-97ff-a56766657d53",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Map transformation: Convert name to uppercase\n",
    "mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "78d8ef21-d4d1-4361-b448-6c23e251e8f0",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "rdd with uppercease name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]\n"
     ]
    }
   ],
   "source": [
    "result = mapped_rdd.collect()\n",
    "print(\"rdd with uppercease name: \", result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "2211dbf5-64be-4966-bcb4-e11fdbc9363f",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Charlie', 35), ('Alice', 40)]"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Filter transformation: Filter records where age is greater than 30\n",
    "filtered_rdd = rdd.filter(lambda x: x[1] > 30)\n",
    "filtered_rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "325614c2-ede1-45f4-9818-6e89cb72e044",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Alice', 65), ('Charlie', 35), ('Bob', 30)]"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# ReduceByKey transformation: Calculate the total age for each name\n",
    "reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)\n",
    "reduced_rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "1a5e96bb-f8ce-4239-949f-184648b60ae7",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# SortBy transformation: Sort the RDD by age in descending order\n",
    "sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)\n",
    "sorted_rdd.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d51d11e9-260f-421d-94d9-350e5c6146bb",
   "metadata": {},
   "source": [
    "### Save RDDs to text file and read RDDs from text file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "c8389f05-063a-4a52-beb2-efc4e50daa6a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save action: Save the RDD to a text file\n",
    "rdd.saveAsTextFile(\"output.txt\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "2a557485-8d1b-431c-aba5-dcbe21f3970d",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[]"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# create rdd from text file\n",
    "rdd_text = spark.sparkContext.textFile(\"output.txt\")\n",
    "rdd_text.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c799ec33-2a47-4d8e-b239-92f25c8e7a37",
   "metadata": {},
   "source": [
    "### Shut down Spark Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "id": "b501edd4-5d9a-4ffe-8cbb-abcf077868c4",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "695cb724-4691-441e-97f1-7320c109a62f",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "pyspark",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
